-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DLQ for reprocessing of failed blocks #61
Add DLQ for reprocessing of failed blocks #61
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. Join @adammilnesmith and the rest of your teammates on Graphite |
9fa28e3
to
623069c
Compare
623069c
to
4401b67
Compare
@@ -91,6 +91,9 @@ func (i *ingester) trySendBlockBatch( | |||
for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] { | |||
// Skip Failed block if we're configured to skip Failed blocks | |||
if i.cfg.SkipFailedBlocks && block.Errored() { | |||
i.log.Error("SendBlocks: RPCBlock has an error, requeueing...", "block", block.BlockNumber, "error", block.Error) | |||
i.dlq.AddBlock(block.BlockNumber, 0) | |||
delete(collectedBlocks, nextBlockToSend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug fix: This delete
should also fix a memory leak in collectedBlocks
that would have existed when SkipFailedBlocks
was enabled
@@ -110,7 +113,7 @@ func (i *ingester) trySendBlockBatch( | |||
|
|||
// Send the batch | |||
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber | |||
if lastBlockNumber != nextBlockToSend-1 { | |||
if !i.cfg.SkipFailedBlocks && lastBlockNumber != nextBlockToSend-1 { | |||
panic("unexpected last block number") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug fix: This assertion only works when we aren't skipping failed blocks. If we skip the last block that would have gone into a batch then we were panicking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job 🥇 Mostly some questions
9760102
to
a90ff9b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! My only comment is that it doesn't make much sense to add the type parameter T
in the DLQ.
a90ff9b
to
a09b211
Compare
"elapsed", time.Since(startTime), | ||
"error", err, | ||
) | ||
if !i.cfg.SkipFailedBlocks { | ||
return err | ||
} | ||
blocks <- models.RPCBlock{BlockNumber: blockNumber, Error: err} | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bug fix: this should avoid a rare hang during an attempted clean shutdown if the context closes when we are writing to the channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LFG 🔥
Nice bug fixes! |
Awesome work, well done! It's not clear to me why we need to duplicate so much of the sending code, though -- would love to hear more about that when you're back |
We will be able to dedup some of this but the short version is: for the DLQ we have wrapping types and sending without batches or ordering. |
Great, thanks! |
Adds a DLQ for block gaps and failed blocks to be reprocessed as per internal design doc.
Incidental bug fixes:
collectedBlocks
that would have existed whenSkipFailedBlocks
was enabled